package sbox

import (
	"errors"
	"fmt"
	"sync"

	perror "gitee.com/dennis-kk/service-box-go/util/errors"
	"gitee.com/dennis-kk/service-box-go/util/service_infra"
	"gitee.com/dennis-kk/service-box-go/util/slog"
)

type (
	ProxyCallBack func(string, *BoxChannel, error) //get proxy call back
	ConnectHandle func(string, string, string)
	hostInfo      struct {
		index int
		trans []*BoxChannel
	}

	callbackList   []ProxyCallBack      //call back list
	serviceMap     map[string]*hostInfo //service name to transport
	hostServiceMap map[string][]string  //host to service list

	serviceLayerConfig struct {
		MiddlewareType string   `yaml:"type"`
		Prefix         string   `yaml:"prefix"`
		Hosts          []string `yaml:"hosts"`
	}

	//service connect manager
	serviceLayer struct {
		hosts          []string                    //service finder host
		svs            service_infra.IServiceInfra //service finder and register
		connector      ConnectHandle               //connect handle function
		services       serviceMap                  //service's channel cache
		callbacks      map[string]callbackList     //call back map
		uid2Name       map[string]string           //service uuid to name
		connectingHost hostServiceMap              //connecting host
		registered     hostServiceMap              //registered service info
		watcher        *WatcherManager             //service watcher
		rw             sync.RWMutex                // read & write lock
		logger         slog.BoxLogger              //serviceLayer children logger
	}
)

func makeServiceLayer(ch ConnectHandle, logger slog.BoxLogger) *serviceLayer {
	return &serviceLayer{
		hosts:          []string{},
		svs:            nil,
		services:       make(serviceMap),
		connector:      ch,
		callbacks:      make(map[string]callbackList),
		uid2Name:       make(map[string]string),
		connectingHost: make(hostServiceMap),
		registered:     make(hostServiceMap),
		watcher:        &WatcherManager{},
		rw:             sync.RWMutex{},
		logger:         logger,
	}
}

func (sl *serviceLayer) init(cfg *serviceLayerConfig, finder ProxyFinder) error {
	sl.svs = service_infra.CreateServiceInfra(cfg.MiddlewareType)
	if sl.svs == nil {
		sl.logger.Error("[ServiceBox] create service infra %s failed", cfg.MiddlewareType)
		return errors.New("unsupported service finder type ")
	}

	if err := sl.svs.Init(service_infra.WithHosts(cfg.Hosts), service_infra.WithPrefix(cfg.Prefix)); err != nil {
		return err
	}

	if err := sl.svs.AddListener(sl.onServiceChanged); err != nil {
		return err
	}

	sl.hosts = cfg.Hosts
	sl.watcher.init(finder, sl.logger)
	return nil
}

func (sl *serviceLayer) start() error {
	if err := sl.svs.Start(); err != nil {
		return err
	}
	return nil
}

func (sl *serviceLayer) tick() {
	if sl.svs != nil {
		sl.svs.Tick()
	}
}

func (sl *serviceLayer) stop() {
	//反注册所有的服务
	for name, hosts := range sl.registered {
		err := sl.svs.UnRegisterService(name, &service_infra.ServiceInfo{Hosts: hosts})
		if err != nil {
			return
		}
	}

	if err := sl.svs.Stop(); err != nil {
		sl.logger.Warn("[ServiceLayer] stop service infra module error! ")
	}
	sl.watcher.shutdown()
}

func (sl *serviceLayer) addLoadedServiceName(uuid uint64, name string) {
	uuidStr := fmt.Sprintf("%d", uuid)
	sl.uid2Name[uuidStr] = name
}

func (sl *serviceLayer) getServiceName(uuid uint64) string {
	uuidStr := fmt.Sprintf("%d", uuid)
	if name, ok := sl.uid2Name[uuidStr]; ok {
		return name
	}
	return "<UNKOWN>"
}

func (sl *serviceLayer) registerService(name string, host string) error {
	if sl.svs == nil {
		return fmt.Errorf("invalid service finder")
	}

	err := sl.svs.RegisterService(name, &service_infra.ServiceInfo{Hosts: []string{host}})
	if err != nil {
		return err
	}

	// 注册成功添加注册成功列表
	if _, ok := sl.registered[name]; ok {
		sl.registered[name] = append(sl.registered[name], host)
	} else {
		sl.registered[name] = []string{host}
	}

	return nil
}

func (sl *serviceLayer) addWatcher(uuid uint64, name string, watcher ServiceWatcher) error {
	if sl.watcher != nil {
		//如果从未发现过服务，添加前进行一次服务发现
		needDiscover := true
		var discoveredService []*BoxChannel
		sl.rw.RLock()
		if s, ok := sl.services[name]; ok {
			needDiscover = false
			discoveredService = s.trans
		}
		if _, ok := sl.connectingHost[name]; ok {
			needDiscover = false
		}

		if _, ok := sl.callbacks[name]; ok {
			needDiscover = false
		}
		sl.rw.RUnlock()

		if needDiscover {
			if err := sl.svs.FindServiceAsync(name); err != nil {
				return err
			}
		}

		err := sl.watcher.addWatcher(name, uuid, watcher)
		if err != nil {
			return err
		}

		// 如果已经有发现过的服务，手动触发一次
		for _, ch := range discoveredService {
			if ch != nil && !ch.IsClose() {
				sl.watcher.onConnected(name, ch)
			}
		}
		return nil
	}
	return perror.InvalidWatcherManager
}

func (sl *serviceLayer) removeWatcher(name string, watcher ServiceWatcher) error {
	if sl.watcher != nil {
		sl.watcher.removeWatcher(name, watcher)
		return nil
	}
	return perror.InvalidWatcherManager
}

//onServiceChanged will been triggered by soa system while services node changed
func (sl *serviceLayer) onServiceChanged(name string, eventType service_infra.ChangedType, changeInfo *service_infra.ChangedInfo, err error) {
	if err != nil {
		sl.handleSoaError(name, err)
		return
	}
	switch eventType {
	case service_infra.ServiceChange:
		sl.doChangService(name, changeInfo)
	case service_infra.ServiceDelete:
		sl.doDeleteService(name)
	}
}

func (sl *serviceLayer) handleSoaError(name string, err error) {
	sl.rw.Lock()
	//defer sl.rw.Unlock()

	var tcb struct {
		name string
		cbs  callbackList
	}

	//拷贝回调函数，在外部调用, 减少锁粒度
	if cbs, ok := sl.callbacks[name]; ok {
		tcb = struct {
			name string
			cbs  callbackList
		}{
			name,
			cbs,
		}
		delete(sl.callbacks, name)
	}
	sl.rw.Unlock()

	for _, cb := range tcb.cbs {
		cb(tcb.name, nil, err)
	}

}

func (sl *serviceLayer) doChangService(name string, changeInfo *service_infra.ChangedInfo) {
	if changeInfo == nil {
		return
	}
	//只读信息，剔除在close方法里面做, 当服务都被清理的时候
	sl.rw.RLock()
	connected, ok := sl.services[name]
	var newHost []string
	if !ok || len(connected.trans) == 0 {
		newHost = changeInfo.Hosts
	} else {
		for _, host := range changeInfo.Hosts {
			add := true
			for _, tran := range connected.trans {
				if tran.RemoteAddr() == host {
					//host already in this set
					add = false
					break
				}
			}
			if add {
				newHost = append(newHost, host)
				sl.logger.Info("add new node %s to host %s:%s cluster", host, name, sl.uid2Name[name])
			}
		}
	}
	sl.rw.RUnlock()

	//connect to service
	for idx := range newHost {
		sl.connectToHost(name, newHost[idx])
	}
}

func (sl *serviceLayer) doDeleteService(name string) {
	//remove all connection!
	sl.rw.Lock()
	//defer sl.rw.Unlock()

	var tcb struct {
		name string
		cbs  callbackList
	}

	delete(sl.services, name)
	//remove connecting host
	for host, slist := range sl.connectingHost {
		d := false
		for _, v := range slist {
			if v == name {
				d = true
				break
			}
		}
		if d {
			delete(sl.connectingHost, host)
		}
	}
	//trigger call back
	if cbs, ok := sl.callbacks[name]; ok {
		//for _, cb := range cbs {
		//	cb(name, nil, nil)
		//}
		tcb = struct {
			name string
			cbs  callbackList
		}{name: name, cbs: cbs}

		delete(sl.callbacks, name)
	}
	sl.rw.Unlock()

	//锁外调用回调函数，提高锁的性能
	for _, cb := range tcb.cbs {
		cb(tcb.name, nil, nil)
	}

	sl.logger.Info("service %s be removed from cluster", name)
}

// onConnect will add box's channel to cache with service's name
// If Get encounters any errors, it will return.
func (sl *serviceLayer) onConnect(host string, bch *BoxChannel) error {
	var tcbs []struct {
		name string
		cbs  callbackList
	}

	sl.rw.Lock()
	services, ok := sl.connectingHost[host]
	if !ok {
		//connect returns but service has shutdown
		sl.rw.Unlock()
		return nil
	}

	for _, name := range services {
		// add channel to cache
		hinfo := sl.services[name]
		if hinfo == nil {
			hinfo = &hostInfo{
				index: 0,
				trans: []*BoxChannel{bch},
			}
			sl.services[name] = hinfo
		} else {
			hinfo.trans = append(hinfo.trans, bch)
		}

		//虽然没有回调者，但是可能有watcher
		tb := struct {
			name string
			cbs  callbackList
		}{
			name: name,
		}
		// try trigger callback
		if cbs, ok := sl.callbacks[name]; ok {
			tb.cbs = cbs
			delete(sl.callbacks, name)
		}
		tcbs = append(tcbs, tb)
	}
	delete(sl.connectingHost, host)
	sl.rw.Unlock()

	//trigger call back
	for _, t := range tcbs {
		for _, cb := range t.cbs {
			cb(t.name, bch, nil)
		}
		sl.watcher.onConnected(t.name, bch)
	}
	return nil
}

func (sl *serviceLayer) onConnectFailed(host string, err error) error {
	var tcbs []struct {
		name string
		cbs  callbackList
	}
	sl.rw.Lock()
	services, ok := sl.connectingHost[host]
	if !ok {
		//connect returns but service has shutdown
		sl.logger.Info("connect to host %s failed %v", host, err)
		sl.rw.Unlock()
		return nil
	}
	for _, name := range services {
		// try trigger callback
		if cbs, ok := sl.callbacks[name]; ok {
			tcbs = append(tcbs, struct {
				name string
				cbs  callbackList
			}{
				name,
				cbs,
			})
			delete(sl.callbacks, name)
		}
	}
	delete(sl.connectingHost, host)
	sl.rw.Unlock()

	//trigger call back
	for _, t := range tcbs {
		for _, cb := range t.cbs {
			cb(t.name, nil, err)
		}
	}
	sl.logger.Info("connect to host %s failed %v", host, err)
	return nil
}

func (sl *serviceLayer) onClose(host string) {
	sl.rw.Lock()

	var ms []string

	//remove connecting and callback
	if services, ok := sl.connectingHost[host]; ok {
		for _, service := range services {
			delete(sl.callbacks, service)
		}
		delete(sl.connectingHost, host)
	}

	for name, hInfo := range sl.services {
		if hInfo == nil {
			continue
		}
		hLen := len(hInfo.trans)
		for i := 0; i < hLen; {
			if hInfo.trans[i].RemoteAddr() == host {
				hInfo.trans[i] = hInfo.trans[hLen-1]
				hLen--
			} else {
				i++
			}
		}

		//如果长度不一样，则说明有删减变化
		if hLen != len(hInfo.trans) {
			ms = append(ms, name)
		}

		if hLen != 0 {
			//FIXME
			hInfo.trans = hInfo.trans[0:hLen]
		} else {
			hInfo.trans = []*BoxChannel{}
		}

	}
	sl.rw.Unlock()

	for idx := range ms {
		sl.watcher.onClose(ms[idx], host)
	}

	sl.logger.Info("host %s has been closed !", host)
}

//tryGetTransport will get transport by service name
func (sl *serviceLayer) tryGetTransport(name string) *BoxChannel {
	sl.rw.RLock()
	defer sl.rw.RUnlock()

	// transport exists in cache
	if hInfo, ok := sl.services[name]; ok {
		if hInfo != nil && len(hInfo.trans) != 0 {
			hInfo.index = (hInfo.index + 1) % len(hInfo.trans)
			return hInfo.trans[hInfo.index]
		}
	}
	return nil
}

func (sl *serviceLayer) getConnectedTransports(name string) []*BoxChannel {
	sl.rw.RLock()
	defer sl.rw.RUnlock()

	// transport exists in cache
	if hInfo, ok := sl.services[name]; ok {
		if hInfo != nil && len(hInfo.trans) != 0 {
			all := hInfo.trans
			return all
		}
	}
	return nil
}

func (sl *serviceLayer) getTransport(name string, cb ProxyCallBack) error {
	// 从缓存读取服务，使用读锁，减少并发
	sl.rw.RLock()
	if hInfo, ok := sl.services[name]; ok {
		if hInfo != nil && len(hInfo.trans) != 0 {
			hInfo.index = (hInfo.index + 1) % len(hInfo.trans)
			cb(name, hInfo.trans[hInfo.index], nil)
			sl.rw.RUnlock()
			return nil
		}
	}
	sl.rw.RUnlock()

	//try find service
	if sl.svs == nil {
		return fmt.Errorf("invalid service finder")
	}

	// 发送命令成功才会追加callback，否则会造成大量堆积
	if err := sl.svs.FindServiceAsync(name); err != nil {
		return err
	}

	sl.rw.Lock()
	// add to callback
	if cbs, ok := sl.callbacks[name]; ok {
		sl.callbacks[name] = append(cbs, cb)
	} else {
		sl.callbacks[name] = callbackList{cb}
	}
	// not find in cache,try check in connected
	for _, services := range sl.connectingHost {
		for _, service := range services {
			//has connected to host, return
			if service == name {
				sl.rw.Unlock()
				return nil
			}
		}
	}
	sl.rw.Unlock()

	return nil
}

func (sl *serviceLayer) connectToHost(name, host string) {

	if sl.connector == nil {
		panic("no effective connector")
	}

	sl.rw.Lock()
	//defer sl.rw.Unlock()

	if connections, ok := sl.connectingHost[host]; ok {
		for _, connection := range connections {
			if connection == name {
				//already in connecting host, connecting to service
				sl.rw.Unlock()
				return
			}
		}
		sl.connectingHost[host] = append(connections, name)
		sl.rw.Unlock()
		return
	}

	//check name in connected service
	if connected, ok := sl.services[name]; ok {
		for _, trans := range connected.trans {
			if trans.RemoteAddr() == host {
				//already connect to remote, trigger callback and return
				if cbs, ok := sl.callbacks[name]; ok {
					for _, cb := range cbs {
						cb(name, trans, nil)
					}
					delete(sl.callbacks, name)
				}
				sl.rw.Unlock()
				return
			}
		}
	}
	sl.connectingHost[host] = []string{name}
	sl.rw.Unlock()

	sl.connector(name, "tcp", host)
}
